home *** CD-ROM | disk | FTP | other *** search
/ Enter 2006 September / Enter 09 2006.iso / Internet / SpamExperts Home 1.1 / SpamExperts Home.exe / lib / spamexperts.modules / ZODB / Connection.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2006-07-14  |  25.9 KB  |  879 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. '''Database connection support
  5.  
  6. $Id: Connection.py 40785 2005-12-14 21:49:51Z tim_one $'''
  7. import logging
  8. import sys
  9. import tempfile
  10. import threading
  11. import warnings
  12. from time import time
  13. from persistent import PickleCache
  14. from persistent.interfaces import IPersistentDataManager
  15. from ZODB.interfaces import IConnection
  16. from transaction.interfaces import ISavepointDataManager
  17. from transaction.interfaces import IDataManagerSavepoint
  18. from transaction.interfaces import ISynchronizer
  19. from zope.interface import implements
  20. import transaction
  21. from ZODB.ConflictResolution import ResolvedSerial
  22. from ZODB.ExportImport import ExportImport
  23. from ZODB import POSException
  24. from ZODB.POSException import InvalidObjectReference, ConnectionStateError
  25. from ZODB.POSException import ConflictError, ReadConflictError
  26. from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
  27. from ZODB.utils import p64, u64, z64, oid_repr, positive_id
  28. global_reset_counter = 0
  29.  
  30. def resetCaches():
  31.     """Causes all connection caches to be reset as connections are reopened.
  32.  
  33.     Zope's refresh feature uses this.  When you reload Python modules,
  34.     instances of classes continue to use the old class definitions.
  35.     To use the new code immediately, the refresh feature asks ZODB to
  36.     clear caches by calling resetCaches().  When the instances are
  37.     loaded by subsequent connections, they will use the new class
  38.     definitions.
  39.     """
  40.     global global_reset_counter
  41.     global_reset_counter += 1
  42.  
  43.  
  44. class Connection(ExportImport, object):
  45.     '''Connection to ZODB for loading and storing objects.'''
  46.     implements(IConnection, ISavepointDataManager, IPersistentDataManager, ISynchronizer)
  47.     _code_timestamp = 0
  48.     
  49.     def __init__(self, db, version = '', cache_size = 400):
  50.         '''Create a new Connection.'''
  51.         self._db = db
  52.         self._normal_storage = self._storage = db._storage
  53.         self.new_oid = db._storage.new_oid
  54.         self._savepoint_storage = None
  55.         self.transaction_manager = None
  56.         self._synch = None
  57.         self._mvcc = None
  58.         self._log = logging.getLogger('ZODB.Connection')
  59.         self._debug_info = ()
  60.         self._opened = None
  61.         self._version = version
  62.         self._cache = cache = PickleCache(self, cache_size)
  63.         if version:
  64.             self._cache.cache_drain_resistance = 100
  65.         
  66.         self._committed = []
  67.         self._added = { }
  68.         self._added_during_commit = None
  69.         self._reset_counter = global_reset_counter
  70.         self._load_count = 0
  71.         self._store_count = 0
  72.         self._creating = { }
  73.         self._modified = []
  74.         self._registered_objects = []
  75.         self._needs_to_join = True
  76.         self._inv_lock = threading.Lock()
  77.         self._invalidated = d = { }
  78.         self._conflicts = { }
  79.         self._txn_time = None
  80.         self._import = None
  81.         self._reader = ObjectReader(self, self._cache, self._db.classFactory)
  82.         self.connections = {
  83.             self._db.database_name: self }
  84.  
  85.     
  86.     def add(self, obj):
  87.         """Add a new object 'obj' to the database and assign it an oid."""
  88.         if self._opened is None:
  89.             raise ConnectionStateError('The database connection is closed')
  90.         
  91.         marker = object()
  92.         oid = getattr(obj, '_p_oid', marker)
  93.         if oid is marker:
  94.             raise TypeError('Only first-class persistent objects may be added to a Connection.', obj)
  95.         elif obj._p_jar is None:
  96.             if not obj._p_oid is None:
  97.                 raise AssertionError
  98.             oid = obj._p_oid = self._storage.new_oid()
  99.             obj._p_jar = self
  100.             if self._added_during_commit is not None:
  101.                 self._added_during_commit.append(obj)
  102.             
  103.             self._register(obj)
  104.             self._added[oid] = obj
  105.         elif obj._p_jar is not self:
  106.             raise InvalidObjectReference(obj, obj._p_jar)
  107.         
  108.  
  109.     
  110.     def get(self, oid):
  111.         """Return the persistent object with oid 'oid'."""
  112.         if self._opened is None:
  113.             raise ConnectionStateError('The database connection is closed')
  114.         
  115.         obj = self._cache.get(oid, None)
  116.         if obj is not None:
  117.             return obj
  118.         
  119.         obj = self._added.get(oid, None)
  120.         if obj is not None:
  121.             return obj
  122.         
  123.         (p, serial) = self._storage.load(oid, self._version)
  124.         obj = self._reader.getGhost(p)
  125.         obj._p_oid = oid
  126.         obj._p_jar = self
  127.         obj._p_changed = None
  128.         obj._p_serial = serial
  129.         self._cache[oid] = obj
  130.         return obj
  131.  
  132.     
  133.     def cacheMinimize(self):
  134.         '''Deactivate all unmodified objects in the cache.'''
  135.         self._cache.minimize()
  136.  
  137.     
  138.     def cacheGC(self):
  139.         '''Reduce cache size to target size.'''
  140.         self._cache.incrgc()
  141.  
  142.     __onCloseCallbacks = None
  143.     
  144.     def onCloseCallback(self, f):
  145.         '''Register a callable, f, to be called by close().'''
  146.         if self._Connection__onCloseCallbacks is None:
  147.             self._Connection__onCloseCallbacks = []
  148.         
  149.         self._Connection__onCloseCallbacks.append(f)
  150.  
  151.     
  152.     def close(self, primary = True):
  153.         '''Close the Connection.'''
  154.         if not self._needs_to_join:
  155.             raise ConnectionStateError('Cannot close a connection joined to a transaction')
  156.         
  157.         if self._cache is not None:
  158.             self._cache.incrgc()
  159.         
  160.         if self._Connection__onCloseCallbacks is not None:
  161.             for f in self._Connection__onCloseCallbacks:
  162.                 
  163.                 try:
  164.                     f()
  165.                 continue
  166.                 f = getattr(f, 'im_self', f)
  167.                 self._log.error('Close callback failed for %s', f, exc_info = sys.exc_info())
  168.                 continue
  169.  
  170.             
  171.             self._Connection__onCloseCallbacks = None
  172.         
  173.         self._debug_info = ()
  174.         if self._synch:
  175.             self.transaction_manager.unregisterSynch(self)
  176.             self._synch = None
  177.         
  178.         if primary:
  179.             for connection in self.connections.values():
  180.                 if connection is not self:
  181.                     connection.close(False)
  182.                     continue
  183.             
  184.             if self._opened is not None:
  185.                 self._db._returnToPool(self)
  186.             
  187.         else:
  188.             self._opened = None
  189.  
  190.     
  191.     def db(self):
  192.         '''Returns a handle to the database this connection belongs to.'''
  193.         return self._db
  194.  
  195.     
  196.     def isReadOnly(self):
  197.         '''Returns True if the storage for this connection is read only.'''
  198.         if self._opened is None:
  199.             raise ConnectionStateError('The database connection is closed')
  200.         
  201.         return self._storage.isReadOnly()
  202.  
  203.     
  204.     def invalidate(self, tid, oids):
  205.         """Notify the Connection that transaction 'tid' invalidated oids."""
  206.         self._inv_lock.acquire()
  207.         
  208.         try:
  209.             if self._txn_time is None:
  210.                 self._txn_time = tid
  211.             
  212.             self._invalidated.update(oids)
  213.         finally:
  214.             self._inv_lock.release()
  215.  
  216.  
  217.     
  218.     def root(self):
  219.         '''Return the database root object.'''
  220.         return self.get(z64)
  221.  
  222.     
  223.     def getVersion(self):
  224.         '''Returns the version this connection is attached to.'''
  225.         if self._storage is None:
  226.             raise ConnectionStateError('The database connection is closed')
  227.         
  228.         return self._version
  229.  
  230.     
  231.     def get_connection(self, database_name):
  232.         '''Return a Connection for the named database.'''
  233.         connection = self.connections.get(database_name)
  234.         if connection is None:
  235.             new_con = self._db.databases[database_name].open(transaction_manager = self.transaction_manager, mvcc = self._mvcc, version = self._version, synch = self._synch)
  236.             self.connections.update(new_con.connections)
  237.             new_con.connections = self.connections
  238.             connection = new_con
  239.         
  240.         return connection
  241.  
  242.     
  243.     def _implicitlyAdding(self, oid):
  244.         '''Are we implicitly adding an object within the current transaction
  245.  
  246.         This is used in a check to avoid implicitly adding an object
  247.         to a database in a multi-database situation.
  248.         See serialize.ObjectWriter.persistent_id.
  249.  
  250.         '''
  251.         if self._creating.get(oid, 0) and self._savepoint_storage is not None:
  252.             pass
  253.         return self._savepoint_storage.creating.get(oid, 0)
  254.  
  255.     
  256.     def sync(self):
  257.         '''Manually update the view on the database.'''
  258.         self.transaction_manager.abort()
  259.         self._storage_sync()
  260.  
  261.     
  262.     def getDebugInfo(self):
  263.         '''Returns a tuple with different items for debugging the
  264.         connection.
  265.         '''
  266.         return self._debug_info
  267.  
  268.     
  269.     def setDebugInfo(self, *args):
  270.         '''Add the given items to the debug information of this connection.'''
  271.         self._debug_info = self._debug_info + args
  272.  
  273.     
  274.     def getTransferCounts(self, clear = False):
  275.         '''Returns the number of objects loaded and stored.'''
  276.         res = (self._load_count, self._store_count)
  277.         if clear:
  278.             self._load_count = 0
  279.             self._store_count = 0
  280.         
  281.         return res
  282.  
  283.     
  284.     def abort(self, transaction):
  285.         '''Abort a transaction and forget all changes.'''
  286.         self._abort()
  287.         if self._savepoint_storage is not None:
  288.             self._abort_savepoint()
  289.         
  290.         self._tpc_cleanup()
  291.  
  292.     
  293.     def _abort(self):
  294.         '''Abort a transaction and forget all changes.'''
  295.         for obj in self._registered_objects:
  296.             oid = obj._p_oid
  297.             if not oid is not None:
  298.                 raise AssertionError
  299.             if oid in self._added:
  300.                 del self._added[oid]
  301.                 del obj._p_jar
  302.                 del obj._p_oid
  303.                 continue
  304.             self._cache.invalidate(oid)
  305.         
  306.  
  307.     
  308.     def _tpc_cleanup(self):
  309.         '''Performs cleanup operations to support tpc_finish and tpc_abort.'''
  310.         self._conflicts.clear()
  311.         if not self._synch:
  312.             self._flush_invalidations()
  313.         
  314.         self._needs_to_join = True
  315.         self._registered_objects = []
  316.         self._creating.clear()
  317.  
  318.     
  319.     def _flush_invalidations(self):
  320.         self._inv_lock.acquire()
  321.         
  322.         try:
  323.             invalidated = self._invalidated
  324.             self._invalidated = { }
  325.             self._txn_time = None
  326.         finally:
  327.             self._inv_lock.release()
  328.  
  329.         self._cache.invalidate(invalidated)
  330.         self._cache.incrgc()
  331.  
  332.     
  333.     def tpc_begin(self, transaction):
  334.         '''Begin commit of a transaction, starting the two-phase commit.'''
  335.         self._modified = []
  336.         self._creating.clear()
  337.         self._normal_storage.tpc_begin(transaction)
  338.  
  339.     
  340.     def commit(self, transaction):
  341.         '''Commit changes to an object'''
  342.         if self._savepoint_storage is not None:
  343.             self.savepoint()
  344.             self._commit_savepoint(transaction)
  345.         else:
  346.             self._commit(transaction)
  347.  
  348.     
  349.     def _commit(self, transaction):
  350.         '''Commit changes to an object'''
  351.         if self._import:
  352.             self._importDuringCommit(transaction, *self._import)
  353.             self._import = None
  354.         
  355.         self._added_during_commit = []
  356.         for obj in self._registered_objects:
  357.             oid = obj._p_oid
  358.             if not oid:
  359.                 raise AssertionError
  360.             if oid in self._conflicts:
  361.                 raise ReadConflictError(object = obj)
  362.             
  363.             if obj._p_jar is not self:
  364.                 raise InvalidObjectReference(obj, obj._p_jar)
  365.             elif oid in self._added:
  366.                 if not obj._p_serial == z64:
  367.                     raise AssertionError
  368.             elif obj._p_changed:
  369.                 if oid in self._invalidated:
  370.                     resolve = getattr(obj, '_p_resolveConflict', None)
  371.                     if resolve is None:
  372.                         raise ConflictError(object = obj)
  373.                     
  374.                 
  375.                 self._modified.append(oid)
  376.             
  377.             self._store_objects(ObjectWriter(obj), transaction)
  378.         
  379.         for obj in self._added_during_commit:
  380.             self._store_objects(ObjectWriter(obj), transaction)
  381.         
  382.         self._added_during_commit = None
  383.  
  384.     
  385.     def _store_objects(self, writer, transaction):
  386.         for obj in writer:
  387.             oid = obj._p_oid
  388.             serial = getattr(obj, '_p_serial', z64)
  389.             if serial == z64:
  390.                 implicitly_adding = self._added.pop(oid, None) is None
  391.                 self._creating[oid] = implicitly_adding
  392.             elif oid in self._invalidated and not hasattr(obj, '_p_resolveConflict'):
  393.                 raise ConflictError(object = obj)
  394.             
  395.             self._modified.append(oid)
  396.             p = writer.serialize(obj)
  397.             s = self._storage.store(oid, serial, p, self._version, transaction)
  398.             self._store_count += 1
  399.             
  400.             try:
  401.                 self._cache[oid] = obj
  402.             except:
  403.                 self
  404.                 if hasattr(obj, 'aq_base'):
  405.                     self._cache[oid] = obj.aq_base
  406.                 else:
  407.                     raise 
  408.  
  409.             self._handle_serial(s, oid)
  410.         
  411.  
  412.     
  413.     def _handle_serial(self, store_return, oid = None, change = 1):
  414.         '''Handle the returns from store() and tpc_vote() calls.'''
  415.         if not store_return:
  416.             return None
  417.         
  418.         if isinstance(store_return, str):
  419.             if not oid is not None:
  420.                 raise AssertionError
  421.             self._handle_one_serial(oid, store_return, change)
  422.         else:
  423.             for oid, serial in store_return:
  424.                 self._handle_one_serial(oid, serial, change)
  425.             
  426.  
  427.     
  428.     def _handle_one_serial(self, oid, serial, change):
  429.         if not isinstance(serial, str):
  430.             raise serial
  431.         
  432.         obj = self._cache.get(oid, None)
  433.         if obj is None:
  434.             return None
  435.         
  436.         if serial == ResolvedSerial:
  437.             del obj._p_changed
  438.         elif change:
  439.             obj._p_changed = 0
  440.         
  441.         obj._p_serial = serial
  442.  
  443.     
  444.     def tpc_abort(self, transaction):
  445.         if self._import:
  446.             self._import = None
  447.         
  448.         if self._savepoint_storage is not None:
  449.             self._abort_savepoint()
  450.         
  451.         self._storage.tpc_abort(transaction)
  452.         self._cache.invalidate(self._modified)
  453.         self._invalidate_creating()
  454.         while self._added:
  455.             (oid, obj) = self._added.popitem()
  456.             del obj._p_oid
  457.             del obj._p_jar
  458.         self._tpc_cleanup()
  459.  
  460.     
  461.     def _invalidate_creating(self, creating = None):
  462.         '''Disown any objects newly saved in an uncommitted transaction.'''
  463.         if creating is None:
  464.             creating = self._creating
  465.             self._creating = { }
  466.         
  467.         for oid in creating:
  468.             o = self._cache.get(oid)
  469.             if o is not None:
  470.                 del self._cache[oid]
  471.                 del o._p_jar
  472.                 del o._p_oid
  473.                 continue
  474.         
  475.  
  476.     
  477.     def tpc_vote(self, transaction):
  478.         '''Verify that a data manager can commit the transaction.'''
  479.         
  480.         try:
  481.             vote = self._storage.tpc_vote
  482.         except AttributeError:
  483.             return None
  484.  
  485.         s = vote(transaction)
  486.         self._handle_serial(s)
  487.  
  488.     
  489.     def tpc_finish(self, transaction):
  490.         '''Indicate confirmation that the transaction is done.'''
  491.         
  492.         def callback(tid):
  493.             d = dict.fromkeys(self._modified)
  494.             self._db.invalidate(tid, d, self)
  495.  
  496.         self._storage.tpc_finish(transaction, callback)
  497.         self._tpc_cleanup()
  498.  
  499.     
  500.     def sortKey(self):
  501.         '''Return a consistent sort key for this connection.'''
  502.         return '%s:%s' % (self._storage.sortKey(), id(self))
  503.  
  504.     
  505.     def beforeCompletion(self, txn):
  506.         pass
  507.  
  508.     
  509.     def _storage_sync(self, *ignored):
  510.         sync = getattr(self._storage, 'sync', 0)
  511.         if sync:
  512.             sync()
  513.         
  514.         self._flush_invalidations()
  515.  
  516.     afterCompletion = _storage_sync
  517.     newTransaction = _storage_sync
  518.     
  519.     def oldstate(self, obj, tid):
  520.         """Return copy of 'obj' that was written by transaction 'tid'."""
  521.         if not obj._p_jar is self:
  522.             raise AssertionError
  523.         p = self._storage.loadSerial(obj._p_oid, tid)
  524.         return self._reader.getState(p)
  525.  
  526.     
  527.     def setstate(self, obj):
  528.         """Turns the ghost 'obj' into a real object by loading it's from the
  529.         database."""
  530.         oid = obj._p_oid
  531.         if self._opened is None:
  532.             msg = "Shouldn't load state for %s when the connection is closed" % oid_repr(oid)
  533.             self._log.error(msg)
  534.             raise ConnectionStateError(msg)
  535.         
  536.         
  537.         try:
  538.             self._setstate(obj)
  539.         except ConflictError:
  540.             raise 
  541.         except:
  542.             self._log.error("Couldn't load state for %s", oid_repr(oid), exc_info = sys.exc_info())
  543.             raise 
  544.  
  545.  
  546.     
  547.     def _setstate(self, obj):
  548.         if obj._p_oid in self._invalidated and not myhasattr(obj, '_p_independent'):
  549.             self._load_before_or_conflict(obj)
  550.             return None
  551.         
  552.         (p, serial) = self._storage.load(obj._p_oid, self._version)
  553.         self._load_count += 1
  554.         self._inv_lock.acquire()
  555.         
  556.         try:
  557.             invalid = obj._p_oid in self._invalidated
  558.         finally:
  559.             self._inv_lock.release()
  560.  
  561.         if invalid:
  562.             if myhasattr(obj, '_p_independent'):
  563.                 self._handle_independent(obj)
  564.             else:
  565.                 self._load_before_or_conflict(obj)
  566.                 return None
  567.         
  568.         self._reader.setGhostState(obj, p)
  569.         obj._p_serial = serial
  570.  
  571.     
  572.     def _load_before_or_conflict(self, obj):
  573.         '''Load non-current state for obj or raise ReadConflictError.'''
  574.         if not self._mvcc and self._setstate_noncurrent(obj):
  575.             self._register(obj)
  576.             self._conflicts[obj._p_oid] = True
  577.             raise ReadConflictError(object = obj)
  578.         
  579.  
  580.     
  581.     def _setstate_noncurrent(self, obj):
  582.         '''Set state using non-current data.
  583.  
  584.         Return True if state was available, False if not.
  585.         '''
  586.         
  587.         try:
  588.             t = self._storage.loadBefore(obj._p_oid, self._txn_time)
  589.         except KeyError:
  590.             return False
  591.  
  592.         if t is None:
  593.             return False
  594.         
  595.         (data, start, end) = t
  596.         if not start < self._txn_time:
  597.             raise AssertionError, (u64(start), u64(self._txn_time))
  598.         if not end is not None:
  599.             raise AssertionError
  600.         if not self._txn_time <= end:
  601.             raise AssertionError, (u64(self._txn_time), u64(end))
  602.         self._reader.setGhostState(obj, data)
  603.         obj._p_serial = start
  604.         return True
  605.  
  606.     
  607.     def _handle_independent(self, obj):
  608.         if obj._p_independent():
  609.             self._inv_lock.acquire()
  610.             
  611.             try:
  612.                 del self._invalidated[obj._p_oid]
  613.             except KeyError:
  614.                 pass
  615.             finally:
  616.                 self._inv_lock.release()
  617.  
  618.         else:
  619.             self._conflicts[obj._p_oid] = 1
  620.             self._register(obj)
  621.             raise ReadConflictError(object = obj)
  622.  
  623.     
  624.     def register(self, obj):
  625.         '''Register obj with the current transaction manager.
  626.  
  627.         A subclass could override this method to customize the default
  628.         policy of one transaction manager for each thread.
  629.  
  630.         obj must be an object loaded from this Connection.
  631.         '''
  632.         if not obj._p_jar is self:
  633.             raise AssertionError
  634.         if obj._p_oid is None:
  635.             raise ValueError('assigning to _p_jar is not supported')
  636.         elif obj._p_oid in self._added:
  637.             return None
  638.         
  639.         self._register(obj)
  640.  
  641.     
  642.     def _register(self, obj = None):
  643.         if self._needs_to_join:
  644.             self.transaction_manager.get().join(self)
  645.             self._needs_to_join = False
  646.         
  647.         if obj is not None:
  648.             self._registered_objects.append(obj)
  649.         
  650.  
  651.     
  652.     def _cache_items(self):
  653.         items = self._cache.lru_items()
  654.         everything = self._cache.cache_data
  655.         for k, v in items:
  656.             del everything[k]
  657.         
  658.         return everything.items() + items
  659.  
  660.     
  661.     def open(self, transaction_manager = None, mvcc = True, synch = True, delegate = True):
  662.         '''Register odb, the DB that this Connection uses.
  663.  
  664.         This method is called by the DB every time a Connection
  665.         is opened.  Any invalidations received while the Connection
  666.         was closed will be processed.
  667.  
  668.         If the global module function resetCaches() was called, the
  669.         cache will be cleared.
  670.  
  671.         Parameters:
  672.         odb: database that owns the Connection
  673.         mvcc: boolean indicating whether MVCC is enabled
  674.         transaction_manager: transaction manager to use.  None means
  675.             use the default transaction manager.
  676.         synch: boolean indicating whether Connection should
  677.         register for afterCompletion() calls.
  678.         '''
  679.         self._opened = time()
  680.         self._synch = synch
  681.         if mvcc:
  682.             pass
  683.         self._mvcc = not (self._version)
  684.         if transaction_manager is None:
  685.             transaction_manager = transaction.manager
  686.         
  687.         self.transaction_manager = transaction_manager
  688.         if self._reset_counter != global_reset_counter:
  689.             self._resetCache()
  690.         else:
  691.             self._flush_invalidations()
  692.         if synch:
  693.             transaction_manager.registerSynch(self)
  694.         
  695.         if self._cache is not None:
  696.             self._cache.incrgc()
  697.         
  698.         if delegate:
  699.             for connection in self.connections.values():
  700.                 if connection is not self:
  701.                     connection.open(transaction_manager, mvcc, synch, False)
  702.                     continue
  703.             
  704.         
  705.  
  706.     
  707.     def _resetCache(self):
  708.         '''Creates a new cache, discarding the old one.
  709.  
  710.         See the docstring for the resetCaches() function.
  711.         '''
  712.         self._reset_counter = global_reset_counter
  713.         self._invalidated.clear()
  714.         cache_size = self._cache.cache_size
  715.         self._cache = cache = PickleCache(self, cache_size)
  716.  
  717.     
  718.     def __repr__(self):
  719.         if self._version:
  720.             ver = ' (in version %s)' % `self._version`
  721.         else:
  722.             ver = ''
  723.         return '<Connection at %08x%s>' % (positive_id(self), ver)
  724.  
  725.     __getitem__ = get
  726.     
  727.     def modifiedInVersion(self, oid):
  728.         """Returns the version the object with the given oid was modified in.
  729.  
  730.         If it wasn't modified in a version, the current version of this
  731.         connection is returned.
  732.         """
  733.         
  734.         try:
  735.             return self._db.modifiedInVersion(oid)
  736.         except KeyError:
  737.             return self.getVersion()
  738.  
  739.  
  740.     
  741.     def exchange(self, old, new):
  742.         oid = old._p_oid
  743.         new._p_oid = oid
  744.         new._p_jar = self
  745.         new._p_changed = 1
  746.         self._register(new)
  747.         self._cache[oid] = new
  748.  
  749.     
  750.     def savepoint(self):
  751.         if self._savepoint_storage is None:
  752.             self._savepoint_storage = TmpStore(self._version, self._normal_storage)
  753.             self._storage = self._savepoint_storage
  754.         
  755.         self._creating.clear()
  756.         self._commit(None)
  757.         self._storage.creating.update(self._creating)
  758.         self._creating.clear()
  759.         self._registered_objects = []
  760.         state = (self._storage.position, self._storage.index.copy())
  761.         result = Savepoint(self, state)
  762.         self.cacheGC()
  763.         return result
  764.  
  765.     
  766.     def _rollback(self, state):
  767.         self._abort()
  768.         self._registered_objects = []
  769.         src = self._storage
  770.         self._cache.invalidate(src.index)
  771.         src.reset(*state)
  772.  
  773.     
  774.     def _commit_savepoint(self, transaction):
  775.         '''Commit all changes made in subtransactions and begin 2-phase commit
  776.         '''
  777.         src = self._savepoint_storage
  778.         self._storage = self._normal_storage
  779.         self._savepoint_storage = None
  780.         self._log.debug('Commiting savepoints of size %s', src.getSize())
  781.         oids = src.index.keys()
  782.         self._modified.extend(oids)
  783.         self._creating.update(src.creating)
  784.         for oid in oids:
  785.             (data, serial) = src.load(oid, src)
  786.             s = self._storage.store(oid, serial, data, self._version, transaction)
  787.             self._handle_serial(s, oid, change = False)
  788.         
  789.         src.close()
  790.  
  791.     
  792.     def _abort_savepoint(self):
  793.         '''Discard all subtransaction data.'''
  794.         src = self._savepoint_storage
  795.         self._storage = self._normal_storage
  796.         self._savepoint_storage = None
  797.         self._cache.invalidate(src.index)
  798.         self._invalidate_creating(src.creating)
  799.         src.close()
  800.  
  801.  
  802.  
  803. class Savepoint:
  804.     implements(IDataManagerSavepoint)
  805.     
  806.     def __init__(self, datamanager, state):
  807.         self.datamanager = datamanager
  808.         self.state = state
  809.  
  810.     
  811.     def rollback(self):
  812.         self.datamanager._rollback(self.state)
  813.  
  814.  
  815.  
  816. class TmpStore:
  817.     '''A storage-like thing to support savepoints.'''
  818.     
  819.     def __init__(self, base_version, storage):
  820.         self._storage = storage
  821.         for method in ('getName', 'new_oid', 'modifiedInVersion', 'getSize', 'undoLog', 'versionEmpty', 'sortKey', 'loadBefore'):
  822.             setattr(self, method, getattr(storage, method))
  823.         
  824.         self._base_version = base_version
  825.         self._file = tempfile.TemporaryFile()
  826.         self.position = 0x0L
  827.         self.index = { }
  828.         self.creating = { }
  829.  
  830.     
  831.     def __len__(self):
  832.         return len(self.index)
  833.  
  834.     
  835.     def close(self):
  836.         self._file.close()
  837.  
  838.     
  839.     def load(self, oid, version):
  840.         pos = self.index.get(oid)
  841.         if pos is None:
  842.             return self._storage.load(oid, self._base_version)
  843.         
  844.         self._file.seek(pos)
  845.         h = self._file.read(8)
  846.         oidlen = u64(h)
  847.         read_oid = self._file.read(oidlen)
  848.         if read_oid != oid:
  849.             raise POSException.StorageSystemError('Bad temporary storage')
  850.         
  851.         h = self._file.read(16)
  852.         size = u64(h[8:])
  853.         serial = h[:8]
  854.         return (self._file.read(size), serial)
  855.  
  856.     
  857.     def store(self, oid, serial, data, version, transaction):
  858.         if not version == self._base_version:
  859.             raise AssertionError
  860.         self._file.seek(self.position)
  861.         l = len(data)
  862.         if serial is None:
  863.             serial = z64
  864.         
  865.         header = p64(len(oid)) + oid + serial + p64(l)
  866.         self._file.write(header)
  867.         self._file.write(data)
  868.         self.index[oid] = self.position
  869.         self.position += l + len(header)
  870.         return serial
  871.  
  872.     
  873.     def reset(self, position, index):
  874.         self._file.truncate(position)
  875.         self.position = position
  876.         self.index = index.copy()
  877.  
  878.  
  879.